{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Tce3stUlHN0L"
      },
      "source": [
        "##### Copyright 2019 The TensorFlow Authors."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "tuOe1ymfHZPu"
      },
      "outputs": [],
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qFdPvlXBOdUN"
      },
      "source": [
        "# tf.data API によるパフォーマンスの改善"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MfBg1C5NB3X0"
      },
      "source": [
        "<table class=\"tfo-notebook-buttons\" align=\"left\">\n",
        "  <td><a target=\"_blank\" href=\"https://www.tensorflow.org/guide/data_performance\"><img src=\"https://www.tensorflow.org/images/tf_logo_32px.png\">TensorFlow.org で表示</a></td>\n",
        "  <td><a target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/docs-l10n/blob/master/site/ja/guide/data_performance.ipynb\"><img src=\"https://www.tensorflow.org/images/colab_logo_32px.png\">Google Colab で実行</a></td>\n",
        "  <td><a target=\"_blank\" href=\"https://github.com/tensorflow/docs-l10n/blob/master/site/ja/guide/data_performance.ipynb\"><img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\">GitHub でソースを表示</a></td>\n",
        "  <td><a href=\"https://storage.googleapis.com/tensorflow_docs/docs-l10n/site/ja/guide/data_performance.ipynb\"><img src=\"https://www.tensorflow.org/images/download_logo_32px.png\">ノートブックをダウンロード</a></td>\n",
        "</table>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "xHxb-dlhMIzW"
      },
      "source": [
        "## 概要\n",
        "\n",
        "GPU と TPU は、単一のトレーニングステップを実行するために必要な時間を劇的に短縮することができます。ピークパフォーマンスの達成には、現在のステップが終了する前に、次のステップのデータを配信する有効な入力パイプラインが必要となります。柔軟で効率的な入力パイプラインの構築に役立つのが、`tf.data` API です。このドキュメントでは、`tf.data` API を使用して非常に性能の高い TensorFlow 入力パイプラインを構築する方法を説明します。\n",
        "\n",
        "読み進める前に、「[TensorFlow 入力パイプラインの構築](./data.ipynb)」ガイドに目を通し、`tf.data` API の使用方法を学習してください。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "UhNtHfuxCGVy"
      },
      "source": [
        "## リソース\n",
        "\n",
        "- [TensorFlow 入力パイプラインの構築](./data.ipynb)\n",
        "- `tf.data.Dataset` API\n",
        "- <a>TensorFlow プロファイラを使用した <code>tf.data</code> パフォーマンスの分析</a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MUXex9ctTuDB"
      },
      "source": [
        "## セットアップ"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "IqR2PQG4ZaZ0"
      },
      "outputs": [],
      "source": [
        "import tensorflow as tf\n",
        "\n",
        "import time"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "QthTHCKF-jKD"
      },
      "source": [
        "このガイドでは、データセットをイテレートし、パフォーマンスを測定します。次のようなさまざまな要因の影響により、再現可能なパフォーマンスベンチマークを作成することが困難となる場合があります。\n",
        "\n",
        "- 現在の CPU 負荷\n",
        "- ネットワークトラフィック\n",
        "- キャッシュなどの複雑な仕組み\n",
        "\n",
        "そのため、再現可能なベンチマークを提供するために、人工的な例を構築します。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "3bU5gsSI-jKF"
      },
      "source": [
        "### データセット\n",
        "\n",
        "`ArtificialDataset` という、`tf.data.Dataset` から継承するクラスを定義します。このデータセットは次のことを行います。\n",
        "\n",
        "- `num_samples` サンプルを生成する（デフォルトは 3）\n",
        "- ファイルを開くアクションをシミュレーションするために、最初のアイテムの前にしばらくスリープする\n",
        "- ファイルからデータを読み取るアクションをシミュレーションするために、各アイテムを生成する前にしばらくスリープする"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "zUQv4kCd-jKH"
      },
      "outputs": [],
      "source": [
        "class ArtificialDataset(tf.data.Dataset):\n",
        "    def _generator(num_samples):\n",
        "        # Opening the file\n",
        "        time.sleep(0.03)\n",
        "        \n",
        "        for sample_idx in range(num_samples):\n",
        "            # Reading data (line, record) from the file\n",
        "            time.sleep(0.015)\n",
        "            \n",
        "            yield (sample_idx,)\n",
        "    \n",
        "    def __new__(cls, num_samples=3):\n",
        "        return tf.data.Dataset.from_generator(\n",
        "            cls._generator,\n",
        "            output_types=tf.dtypes.int64,\n",
        "            output_shapes=(1,),\n",
        "            args=(num_samples,)\n",
        "        )"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "O9y1WjNv-jKL"
      },
      "source": [
        "このデータセットは `tf.data.Dataset.range` に似ており、各サンプルの開始とサンプル間に一定の遅延を追加します。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FGK1Y4jn-jKM"
      },
      "source": [
        "### トレーニングループ\n",
        "\n",
        "データセットのイテレートにどれくらいの時間がかかるかを測定するダミーのトレーニングループを記述します。トレーニング時間がシミュレーションされます。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "MIaM3u00-jKP"
      },
      "outputs": [],
      "source": [
        "def benchmark(dataset, num_epochs=2):\n",
        "    start_time = time.perf_counter()\n",
        "    for epoch_num in range(num_epochs):\n",
        "        for sample in dataset:\n",
        "            # Performing a training step\n",
        "            time.sleep(0.01)\n",
        "    tf.print(\"Execution time:\", time.perf_counter() - start_time)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "KK58SuXS-jKT"
      },
      "source": [
        "## パフォーマンスの最適化\n",
        "\n",
        "パフォーマンスをどのように最適化できるかを示すために、`ArtificialDataset` のパフォーマンスを改善します。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Xi8t26y7-jKV"
      },
      "source": [
        "### 単純なアプローチ\n",
        "\n",
        "コツを使わずに、単純なパイプラインから始め、ありのままのデータセットをイテレートします。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "_gP7J1y4-jKY"
      },
      "outputs": [],
      "source": [
        "benchmark(ArtificialDataset())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Lxeat5dH-jKf"
      },
      "source": [
        "内部的には、次のように実行時間が使われています。\n",
        "\n",
        "![Prefetched](https://www.tensorflow.org/guide/images/data_performance/prefetched.svg)\n",
        "\n",
        "トレーニングステップの実行には、次のアクションが伴うことがわかります。\n",
        "\n",
        "- ファイルが開いていない場合は、ファイルを開く\n",
        "- ファイルからデータをフェッチする\n",
        "- トレーニングにデータを使用する\n",
        "\n",
        "ところが、このように単純な同期実装では、パイプラインがデータをフェッチしている間、モデルはアイドル状態となります。その反対に、モデルがトレーニング中である場合、入力パイプラインがアイドル状態となります。したがって、トレーニングのステップ時間は、開いて、読み取り、トレーニングする時間すべての和であるということになります。\n",
        "\n",
        "次のセクションでは、この入力パイプラインに基づいて構築し、性能の高い TensorFlow 入力パイプライン設計のベストプラクティスを説明します。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "mfukBGNz-jKh"
      },
      "source": [
        "### プリフェッチ\n",
        "\n",
        "プリフェッチは、トレーニングステップの事前処理とモデルの実行に重なって行われます。モデルがトレーニングステップ `s` を実行する間、入力パイプラインはステップ `s+1` のデータを読み取っています。そうすることで、ステップ時間をトレーニングと、データの抽出にかかる時間の最大時間（和とは反対に）に減少させることができます。\n",
        "\n",
        "`tf.data` API は、`tf.data.Dataset.prefetch` 変換を提供します。データが生成された時間をデータが消費された時間から切り離すために使用できます。具体的には、この変換は、バックグラウンドのスレッドと内部バッファを使用して、要求される前に入力データセットから要素をプリフェッチします。プリフェッチする要素の数は、単一のトレーニングステップによって消費されるバッチの数と同等（またはそれ以上）である必要があります。この値を手動で調整するか、`tf.data.experimental.AUTOTUNE` に設定することができますが、後者の場合、`tf.data` ランタイムによって、ランタイム時に動的に値が調整されます。\n",
        "\n",
        "プリフェッチ変換は、「プロデューサ」の作業と「コンシューマ」の作業をオーバーラップする機会があればいつでもオーバーラップさせることに注意してください。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "DHpUVqH1-jKi"
      },
      "outputs": [],
      "source": [
        "benchmark(\n",
        "    ArtificialDataset()\n",
        "    .prefetch(tf.data.experimental.AUTOTUNE)\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "h7z_kzo--jKn"
      },
      "source": [
        "![Sequential interleave](https://www.tensorflow.org/guide/images/data_performance/sequential_interleave.svg)\n",
        "\n",
        "今度は、サンプル 0 でトレーニングセットアップが実行している間、入力パイプラインはサンプル 1 のデータを読み取っているのがわかります。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "52QMKfaY-jKq"
      },
      "source": [
        "### データ抽出の並列化\n",
        "\n",
        "実世界の状況では、入力データはリモート（GCS や HDFS など）に保管されていることがあります。ローカルとリモートのストレージには、次のような違いがあるため、ローカルでのデータ読み取りに適したデータセットパイプラインは、リモートで読み取られる際にボトルネックとなる可能性があります。\n",
        "\n",
        "- **最初のバイトまでの時間:** リモートストレージからファイルの最初のバイトを読み取る場合、ロカールストレージからよりもずっと長い時間がかかります。\n",
        "- **読み取りのスループット:** リモートストレージの総帯域幅は一般的に大きいため、単一のファイルの読み取りには、この帯域幅のほんのわずかしか使用されません。\n",
        "\n",
        "さらに、生のバイトがメモリに読み込まれると、データのデシリアライズや復号化する必要も出てくるため（[protobuf](https://developers.google.com/protocol-buffers/) など）、さらに計算が必要となります。このオーバーヘッドは、データの格納場所がローカルであるかリモートであるかに関係なく存在しますが、データのプリフェッチが効果的に行われない場合、リモートの場合に大きくなることがあります。\n",
        "\n",
        "データ抽出にまつわるさまざまなオーバーヘッドの影響を緩和するために、`tf.data.Dataset.interleave` 変換を使用して、データの読み込みステップをほかのデータセットのコンテンツ（データファイルリーダーなど）とインターリーブしながら並列化することができます。オーバーラップするデータセットの数は、`cycle_length` 引数で指定し、並列化のレベルは `num_parallel_calls` 引数で指定することができます。`prefetch` 変換と同様に、`interleave` 変換も `tf.data.experimental.AUTOTUNE` をサポートしているため、どのレベルの並列化を使用するかという判断は `tf.data` ランタイムに委ねられます。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "gs8O8Vbu-jKu"
      },
      "source": [
        "#### 順次インターリーブ\n",
        "\n",
        "`tf.data.Dataset.interleave` 変換のデフォルトの引数によって、2 つのデータセットからの単一のサンプルが順次、インターリブされます。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "fDH12GiK-jKw"
      },
      "outputs": [],
      "source": [
        "benchmark(\n",
        "    tf.data.Dataset.range(2)\n",
        "    .interleave(ArtificialDataset)\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "78CsSOnf-jK0"
      },
      "source": [
        "![Parallel interleave](https://www.tensorflow.org/guide/images/data_performance/parallel_interleave.svg)\n",
        "\n",
        "この図は、`interleave` 変換の動作を示しており、利用できる 2 つのデータセットからサンプルが交互にフェッチされています。ただし、ここでは、パフォーマンスの改善は認められません。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "j3cqqmYl-jK2"
      },
      "source": [
        "#### 並列インターリーブ\n",
        "\n",
        "では、`interleave` 変換の `num_parallel_calls` 引数を使用してみましょう。これは、複数のデータセットを並列して読み込むため、ファイルが開かれるまでの待機時間が短縮されます。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "a3FQcTPY-jK4"
      },
      "outputs": [],
      "source": [
        "benchmark(\n",
        "    tf.data.Dataset.range(2)\n",
        "    .interleave(\n",
        "        ArtificialDataset,\n",
        "        num_parallel_calls=tf.data.experimental.AUTOTUNE\n",
        "    )\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "RxRLPB6C-jLA"
      },
      "source": [
        "![Sequential mapping](https://www.tensorflow.org/guide/images/data_performance/sequential_map.svg)\n",
        "\n",
        "今度は、2 つのデータセットの読み取りが並列化され、総合的なデータ処理時間が短縮されています。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5ZCLFWyv-jLB"
      },
      "source": [
        "### データ変換の並列化\n",
        "\n",
        "データを準備する際、入力要素を事前処理する必要がある場合があります。この目的により、`tf.data` API は、ユーザー定義関数を入力データセットの各要素に適用する `tf.data.Dataset.map` 変換を提供しています。入力要素は互いに独立しているため、複数の CPU コアで事前処理を並列化することができます。これを行うために、`prefetch` と `interleave` 変換と同様に、`map` 変換でも `num_parallel_calls` 引数によって並列化のレベルを指定することができます。\n",
        "\n",
        "`num_parallel_calls` 引数に最適な値を選択するには、ハードウェア、トレーニングデータの特性（サイズや形状など）、マップ関数のコスト、および CPU で同時に発生しているほかの処理を考慮する必要があります。簡単な調べ方は、利用可能な CPU コアの数を使用することですが、`prefetch` と `interleave` 変換に関して言えば、`map` 変換は `tf.data.experimental.AUTOTUNE` をサポートしているため、どのレベルの並列化を使用するかという判断は `tf.data` ランタイムに委ねられています。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "GSkKetpx-jLD"
      },
      "outputs": [],
      "source": [
        "def mapped_function(s):\n",
        "    # Do some hard pre-processing\n",
        "    tf.py_function(lambda: time.sleep(0.03), [], ())\n",
        "    return s"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "wiU7W_QC-jLI"
      },
      "source": [
        "#### 順次マッピング\n",
        "\n",
        "基本の例として、並列化を使用せずに `map` 変換を使用することから始めてみましょう。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "ZSBvDpJG-jLL"
      },
      "outputs": [],
      "source": [
        "benchmark(\n",
        "    ArtificialDataset()\n",
        "    .map(mapped_function)\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ngwMTDb6-jLR"
      },
      "source": [
        "![Sequential mapping](https://www.tensorflow.org/guide/images/data_performance/sequential_map.svg)\n",
        "\n",
        "[単純なアプローチ](#The-naive-approach)について言えば、ステップを開いて読み取り、事前処理（マッピング）を行ってトレーニングする時間が、単一のイテレーションの総和となります。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "U-10PE1D-jLU"
      },
      "source": [
        "#### 並列マッピング\n",
        "\n",
        "では、同じ事前処理関数を使用して、複数のサンプルで並列に適用してみましょう。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "F8AYLZbg-jLV"
      },
      "outputs": [],
      "source": [
        "benchmark(\n",
        "    ArtificialDataset()\n",
        "    .map(\n",
        "        mapped_function,\n",
        "        num_parallel_calls=tf.data.experimental.AUTOTUNE\n",
        "    )\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "-MoJklzP-jLe"
      },
      "source": [
        "![Parallel mapping](https://www.tensorflow.org/guide/images/data_performance/parallel_map.svg)\n",
        "\n",
        "この図から、事前処理ステップがオーバーラップしたことで、単一のイテレーションにかかる総合時間が短縮されたことがわかります。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ZY1Q9kJO-jLh"
      },
      "source": [
        "### キャッシング\n",
        "\n",
        "`tf.data.Dataset.cache` 変換は、メモリまたはローカルストレージのいずれかに、データセットをキャッシュすることができるため、各エポック中に一部の操作（ファイルを開いてデータを読み取るなど）が実行されなくなります。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "xieLApaI-jLi"
      },
      "outputs": [],
      "source": [
        "benchmark(\n",
        "    ArtificialDataset()\n",
        "    .map(  # Apply time consuming operations before cache\n",
        "        mapped_function\n",
        "    ).cache(\n",
        "    ),\n",
        "    5\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "KeMgW9XI-jLn"
      },
      "source": [
        "![Prefetched](https://www.tensorflow.org/guide/images/data_performance/prefetched.svg)\n",
        "\n",
        "データセットをキャッシュすると、`cache` 1 の前の変換（ファイルを開いてデータを読み取るなど）は、最初のエポックにのみ実行され、次のエポックは、`cache` 変換によってキャッシュされたデータを再利用するようになります。\n",
        "\n",
        "`map` 変換に渡されるユーザー定義関数が高くつく場合は、`map` 変換の後に `cache` 変換を適用することができますが、これは、キャッシュされるデータセットがメモリやローカルストレージにまだ格納できる場合に限ります。ユーザー定義関数によってデータセットを格納するために必要な容量がキャッシュのキャパシティを超えるほど増加する場合は、`cache` 変換の後に適用するようにするか、トレーニングジョブの前にデータを事前処理することでリソースの使用率を抑えることを検討してください。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "i3NtGI3r-jLp"
      },
      "source": [
        "### マッピングのベクトル化\n",
        "\n",
        "`map` 変換に渡されたユーザー定義関数を呼び出すと、ユーザー定義関数のスケジューリングと実行に関連するオーバーヘッドが生じます。ユーザー定義関数をベクトル化し（1 つの入力バッチでまとめて操作させる）、`map` 変換の*前*に `batch` 変換を適用することをお勧めします。\n",
        "\n",
        "これに適した実践を示すには、artificial データセットは適していません。スケジューリングの遅延は約 10 マイクロ秒（10e-6 秒）であり、`ArtificialDataset` で使用される数十ミリ秒よりはるかに短いため、その影響がわかりづらいからです。\n",
        "\n",
        "この例では、基本の `tf.data.Dataset.range` 関数を使用し、トレーニングループを最も単純な形態まで単純化します。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "xqtiYPmb-jLt"
      },
      "outputs": [],
      "source": [
        "fast_dataset = tf.data.Dataset.range(10000)\n",
        "\n",
        "def fast_benchmark(dataset, num_epochs=2):\n",
        "    start_time = time.perf_counter()\n",
        "    for _ in tf.data.Dataset.range(num_epochs):\n",
        "        for _ in dataset:\n",
        "            pass\n",
        "    tf.print(\"Execution time:\", time.perf_counter() - start_time)\n",
        "    \n",
        "def increment(x):\n",
        "    return x+1"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Fj2gmsMT-jL5"
      },
      "source": [
        "#### スカラマッピング"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Imn3SslJ-jMA"
      },
      "outputs": [],
      "source": [
        "fast_benchmark(\n",
        "    fast_dataset\n",
        "    # Apply function one item at a time\n",
        "    .map(increment)\n",
        "    # Batch\n",
        "    .batch(256)\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "BWUNbPqv-jMF"
      },
      "source": [
        "![Scalar map](https://www.tensorflow.org/guide/images/data_performance/scalar_map.svg)\n",
        "\n",
        "上の図は、何が起きているかを示しています（より少ないサンプル数で）。マッピングされた関数が各サンプルに適用されているのがわかります。この関数は非常に高速ですが、時間パフォーマンスに影響するオーバーヘッドがあります。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "tDVSM0A--jMG"
      },
      "source": [
        "#### ベクトル化されたマッピング"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "nAw1mDLw-jMI"
      },
      "outputs": [],
      "source": [
        "fast_benchmark(\n",
        "    fast_dataset\n",
        "    .batch(256)\n",
        "    # Apply function on a batch of items\n",
        "    # The tf.Tensor.__add__ method already handle batches\n",
        "    .map(increment)\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "DbMteMY9-jMO"
      },
      "source": [
        "![Vectorized map](https://www.tensorflow.org/guide/images/data_performance/vectorized_map.svg)\n",
        "\n",
        "今度は、マッピングされた関数は一度だけ呼び出され、サンプルのバッチに適用されています。関数の実行にかかる時間は長くなりますが、オーバーヘッドの発生は一度だけであり、総合的な時間パフォーマンスが改善されています。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "hfueG0Wj-jMR"
      },
      "source": [
        "### メモリフットプリントの縮小\n",
        "\n",
        "`interleave`、`prefetch`、および `shuffle` といった多数の変換は、要素の内部バッファにとどまります。`map` 変換に渡されるユーザー定義関数が要素のサイズを変更すると、map 変換の順序付けと、要素をバッファリングする変換によって、メモリ使用率に影響が及びます。通常、パフォーマンスの目的でほかの順序が求められない限り、メモリフットプリントがより少なくなる順序を選択することをお勧めしています。\n",
        "\n",
        "#### 部分計算のキャッシング\n",
        "\n",
        "メモリに入りきれないほどのデータに増加する場合を除き、`map` 変換の後にデータセットをキャッシュすることが推奨されます。マッピングされた関数を、時間を消費するものとメモリを消費するものの 2 つに分割できれば、トレードオフを解消することができます。この場合、次のように変換をつなぐことができます。\n",
        "\n",
        "```python\n",
        "dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)\n",
        "```\n",
        "\n",
        "こうすることで、時間を消費する部分は最初のエポック中にのみ実行されるようになるため、キャッシュスペースを使いすぎなくて済みます。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MYOHG69M-jMT"
      },
      "source": [
        "## ベストプラクティスのまとめ\n",
        "\n",
        "性能の高い TensorFlow 入力パイプライン設計のベストプラクティスをまとめてましょう。\n",
        "\n",
        "- [`prefetch` 変換を使用](#Pipelining)して、プロデューサとコンシューマの作業をオーバーラップさせる。\n",
        "-  `interleave` 変換を使用して、[データの読み取り変換を並列化](#Parallelizing-data-extraction)する。\n",
        "- `num_parallel_calls` 引数を設定して、[`map` 変換を並列化](#Parallelizing-data-transformation)する。\n",
        "- [`cache` 変換を使用](#Caching)して、最初のエポック中にデータをメモリにキャッシュする。\n",
        "- `map` 変換に渡される[ユーザー定義関数をベクトル化](#Map-and-batch)する。\n",
        "- `interleave`、`prefetch`、および `shuffle` 変換を適用する際に、[メモリ使用率を低減](#Reducing-memory-footprint)する。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "mP_EMFsQ-jMU"
      },
      "source": [
        "## 数値の再現\n",
        "\n",
        "注意: これ以降のノートブックでは、上記の数値を再現する方法を説明しています。このコードを自由に調整してかまいませんが、このチュートリアルの要点ではないことに留意してください。\n",
        "\n",
        "`tf.data.Dataset` API の理解をさらに深めるには、独自のパイプラインで調整を試すのがよいでしょう。以下は、このガイドの画像を作成するために使用したコードです。次のような一般的な課題の回避策を示しているため、出発点にはご利用ください。\n",
        "\n",
        "- 実行時間の再現可能性\n",
        "- マッピングされた関数の Eager Execution\n",
        "- `interleave` 変換のコーラブル"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "7M_jFLer-jMV"
      },
      "outputs": [],
      "source": [
        "import itertools\n",
        "from collections import defaultdict\n",
        "\n",
        "import numpy as np\n",
        "import matplotlib as mpl\n",
        "import matplotlib.pyplot as plt"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Z3pjnxtK-jMa"
      },
      "source": [
        "### データセット\n",
        "\n",
        "`ArtificialDataset` と同様に、各ステップにかかった時間を返すデータセットを構築できます。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "OgGl4U7t-jMc"
      },
      "outputs": [],
      "source": [
        "class TimeMeasuredDataset(tf.data.Dataset):\n",
        "    # OUTPUT: (steps, timings, counters)\n",
        "    OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)\n",
        "    OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))\n",
        "    \n",
        "    _INSTANCES_COUNTER = itertools.count()  # Number of datasets generated\n",
        "    _EPOCHS_COUNTER = defaultdict(itertools.count)  # Number of epochs done for each dataset\n",
        "    \n",
        "    def _generator(instance_idx, num_samples):\n",
        "        epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])\n",
        "        \n",
        "        # Opening the file\n",
        "        open_enter = time.perf_counter()\n",
        "        time.sleep(0.03)\n",
        "        open_elapsed = time.perf_counter() - open_enter\n",
        "        \n",
        "        for sample_idx in range(num_samples):\n",
        "            # Reading data (line, record) from the file\n",
        "            read_enter = time.perf_counter()\n",
        "            time.sleep(0.015)\n",
        "            read_elapsed = time.perf_counter() - read_enter\n",
        "            \n",
        "            yield (\n",
        "                [(\"Open\",), (\"Read\",)],\n",
        "                [(open_enter, open_elapsed), (read_enter, read_elapsed)],\n",
        "                [(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]\n",
        "            )\n",
        "            open_enter, open_elapsed = -1., -1.  # Negative values will be filtered\n",
        "            \n",
        "    \n",
        "    def __new__(cls, num_samples=3):\n",
        "        return tf.data.Dataset.from_generator(\n",
        "            cls._generator,\n",
        "            output_types=cls.OUTPUT_TYPES,\n",
        "            output_shapes=cls.OUTPUT_SHAPES,\n",
        "            args=(next(cls._INSTANCES_COUNTER), num_samples)\n",
        "        )"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "YQqDP4jk-jMj"
      },
      "source": [
        "このデータセットは、形状 `[[2, 1], [2, 2], [2, 3]]` と型 `[tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]` のサンプルを提供します。各サンプルは、次のとおりです。\n",
        "\n",
        "```\n",
        "(   [(\"Open\"), (\"Read\")],   [(t0, d), (t0, d)],   [(i, e, -1), (i, e, s)] )\n",
        "```\n",
        "\n",
        "次のように解釈してください。\n",
        "\n",
        "- `Open` と `Read` はステップ識別子\n",
        "- `t0` は、対応するステップが開始した時間のタイムスタンプ\n",
        "- `d` は、対応するステップにかかった時間\n",
        "- `i` はインスタンスのインデックス\n",
        "- `e` はエポックのインデックス（データセットがイテレートした回数）\n",
        "- `s` はサンプルのインデックス"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IQK913bB-jMm"
      },
      "source": [
        "### イテレーションループ\n",
        "\n",
        "すべてのタイミングを収集できるように、イテレーションループを多少複雑化するとよいでしょう。これは、上記に説明したサンプルを生成するデータセットでのみ機能します。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "zAy-K_Cq-jMn"
      },
      "outputs": [],
      "source": [
        "def timelined_benchmark(dataset, num_epochs=2):\n",
        "    # Initialize accumulators\n",
        "    steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)\n",
        "    times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)\n",
        "    values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)\n",
        "    \n",
        "    start_time = time.perf_counter()\n",
        "    for epoch_num in range(num_epochs):\n",
        "        epoch_enter = time.perf_counter()\n",
        "        for (steps, times, values) in dataset:\n",
        "            # Record dataset preparation informations\n",
        "            steps_acc = tf.concat((steps_acc, steps), axis=0)\n",
        "            times_acc = tf.concat((times_acc, times), axis=0)\n",
        "            values_acc = tf.concat((values_acc, values), axis=0)\n",
        "            \n",
        "            # Simulate training time\n",
        "            train_enter = time.perf_counter()\n",
        "            time.sleep(0.01)\n",
        "            train_elapsed = time.perf_counter() - train_enter\n",
        "            \n",
        "            # Record training informations\n",
        "            steps_acc = tf.concat((steps_acc, [[\"Train\"]]), axis=0)\n",
        "            times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)\n",
        "            values_acc = tf.concat((values_acc, [values[-1]]), axis=0)\n",
        "        \n",
        "        epoch_elapsed = time.perf_counter() - epoch_enter\n",
        "        # Record epoch informations\n",
        "        steps_acc = tf.concat((steps_acc, [[\"Epoch\"]]), axis=0)\n",
        "        times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)\n",
        "        values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)\n",
        "        time.sleep(0.001)\n",
        "    \n",
        "    tf.print(\"Execution time:\", time.perf_counter() - start_time)\n",
        "    return {\"steps\": steps_acc, \"times\": times_acc, \"values\": values_acc}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "jw_WSQC8-jMs"
      },
      "source": [
        "### 作図方法\n",
        "\n",
        "最後に、`timelined_benchmark` 関数によって返された値でタイムラインを作図できる関数を定義します。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "1j73RxiP-jMw"
      },
      "outputs": [],
      "source": [
        "def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):\n",
        "    # Remove invalid entries (negative times, or empty steps) from the timelines\n",
        "    invalid_mask = np.logical_and(timeline['times'] &gt; 0, timeline['steps'] != b'')[:,0]\n",
        "    steps = timeline['steps'][invalid_mask].numpy()\n",
        "    times = timeline['times'][invalid_mask].numpy()\n",
        "    values = timeline['values'][invalid_mask].numpy()\n",
        "    \n",
        "    # Get a set of different steps, ordered by the first time they are encountered\n",
        "    step_ids, indices = np.stack(np.unique(steps, return_index=True))\n",
        "    step_ids = step_ids[np.argsort(indices)]\n",
        "\n",
        "    # Shift the starting time to 0 and compute the maximal time value\n",
        "    min_time = times[:,0].min()\n",
        "    times[:,0] = (times[:,0] - min_time)\n",
        "    end = max(width, (times[:,0]+times[:,1]).max() + 0.01)\n",
        "    \n",
        "    cmap = mpl.cm.get_cmap(\"plasma\")\n",
        "    plt.close()\n",
        "    fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})\n",
        "    fig.suptitle(title)\n",
        "    fig.set_size_inches(17.0, len(step_ids))\n",
        "    plt.xlim(-0.01, end)\n",
        "    \n",
        "    for i, step in enumerate(step_ids):\n",
        "        step_name = step.decode()\n",
        "        ax = axs[i]\n",
        "        ax.set_ylabel(step_name)\n",
        "        ax.set_ylim(0, 1)\n",
        "        ax.set_yticks([])\n",
        "        ax.set_xlabel(\"time (s)\")\n",
        "        ax.set_xticklabels([])\n",
        "        ax.grid(which=\"both\", axis=\"x\", color=\"k\", linestyle=\":\")\n",
        "        \n",
        "        # Get timings and annotation for the given step\n",
        "        entries_mask = np.squeeze(steps==step)\n",
        "        serie = np.unique(times[entries_mask], axis=0)\n",
        "        annotations = values[entries_mask]\n",
        "        \n",
        "        ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)\n",
        "        if annotate:\n",
        "            for j, (start, width) in enumerate(serie):\n",
        "                annotation = \"\\n\".join([f\"{l}: {v}\" for l,v in zip((\"i\", \"e\", \"s\"), annotations[j])])\n",
        "                ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,\n",
        "                        horizontalalignment='left', verticalalignment='center')\n",
        "    if save:\n",
        "        plt.savefig(title.lower().translate(str.maketrans(\" \", \"_\")) + \".svg\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "xto6GNdO-jM1"
      },
      "source": [
        "### マッピングされた関数にラッパーを使用\n",
        "\n",
        "マッピングされた関数を Eager コンテキストで実行するには、それらを`tf.py_function` 呼び出し内にラップする必要があります。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "39v7JD4L-jM2"
      },
      "outputs": [],
      "source": [
        "def map_decorator(func):\n",
        "    def wrapper(steps, times, values):\n",
        "        # Use a tf.py_function to prevent auto-graph from compiling the method\n",
        "        return tf.py_function(\n",
        "            func,\n",
        "            inp=(steps, times, values),\n",
        "            Tout=(steps.dtype, times.dtype, values.dtype)\n",
        "        )\n",
        "    return wrapper"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7eJRCinb-jM5"
      },
      "source": [
        "### パイプラインの比較"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "YwX4ndHE-jM6"
      },
      "outputs": [],
      "source": [
        "_batch_map_num_items = 50\n",
        "\n",
        "def dataset_generator_fun(*args):\n",
        "    return TimeMeasuredDataset(num_samples=_batch_map_num_items)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "EwxJT2aR-jNA"
      },
      "source": [
        "#### 単純"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "wLKgurx_-jNC"
      },
      "outputs": [],
      "source": [
        "@map_decorator\n",
        "def naive_map(steps, times, values):\n",
        "    map_enter = time.perf_counter()\n",
        "    time.sleep(0.001)  # Time consuming step\n",
        "    time.sleep(0.0001)  # Memory consuming step\n",
        "    map_elapsed = time.perf_counter() - map_enter\n",
        "\n",
        "    return (\n",
        "        tf.concat((steps, [[\"Map\"]]), axis=0),\n",
        "        tf.concat((times, [[map_enter, map_elapsed]]), axis=0),\n",
        "        tf.concat((values, [values[-1]]), axis=0)\n",
        "    )\n",
        "\n",
        "naive_timeline = timelined_benchmark(\n",
        "    tf.data.Dataset.range(2)\n",
        "    .flat_map(dataset_generator_fun)\n",
        "    .map(naive_map)\n",
        "    .batch(_batch_map_num_items, drop_remainder=True)\n",
        "    .unbatch(),\n",
        "    5\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "EJqUMDsO-jNG"
      },
      "source": [
        "### 最適化"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "HYHcwabr-jNH"
      },
      "outputs": [],
      "source": [
        "@map_decorator\n",
        "def time_consuming_map(steps, times, values):\n",
        "    map_enter = time.perf_counter()\n",
        "    time.sleep(0.001 * values.shape[0])  # Time consuming step\n",
        "    map_elapsed = time.perf_counter() - map_enter\n",
        "\n",
        "    return (\n",
        "        tf.concat((steps, tf.tile([[[\"1st map\"]]], [steps.shape[0], 1, 1])), axis=1),\n",
        "        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),\n",
        "        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)\n",
        "    )\n",
        "\n",
        "\n",
        "@map_decorator\n",
        "def memory_consuming_map(steps, times, values):\n",
        "    map_enter = time.perf_counter()\n",
        "    time.sleep(0.0001 * values.shape[0])  # Memory consuming step\n",
        "    map_elapsed = time.perf_counter() - map_enter\n",
        "\n",
        "    # Use tf.tile to handle batch dimension\n",
        "    return (\n",
        "        tf.concat((steps, tf.tile([[[\"2nd map\"]]], [steps.shape[0], 1, 1])), axis=1),\n",
        "        tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),\n",
        "        tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)\n",
        "    )\n",
        "\n",
        "\n",
        "optimized_timeline = timelined_benchmark(\n",
        "    tf.data.Dataset.range(2)\n",
        "    .interleave(  # Parallelize data reading\n",
        "        dataset_generator_fun,\n",
        "        num_parallel_calls=tf.data.experimental.AUTOTUNE\n",
        "    )\n",
        "    .batch(  # Vectorize your mapped function\n",
        "        _batch_map_num_items,\n",
        "        drop_remainder=True)\n",
        "    .map(  # Parallelize map transformation\n",
        "        time_consuming_map,\n",
        "        num_parallel_calls=tf.data.experimental.AUTOTUNE\n",
        "    )\n",
        "    .cache()  # Cache data\n",
        "    .map(  # Reduce memory usage\n",
        "        memory_consuming_map,\n",
        "        num_parallel_calls=tf.data.experimental.AUTOTUNE\n",
        "    )\n",
        "    .prefetch(  # Overlap producer and consumer works\n",
        "        tf.data.experimental.AUTOTUNE\n",
        "    )\n",
        "    .unbatch(),\n",
        "    5\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "b_CSUbxL-jNK"
      },
      "outputs": [],
      "source": [
        "draw_timeline(naive_timeline, \"Naive\", 15)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "DoovY7qr-jNR"
      },
      "outputs": [],
      "source": [
        "draw_timeline(optimized_timeline, \"Optimized\", 15)"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [],
      "name": "data_performance.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
